// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use chrono::DateTime;
use chrono::Utc;

pub const EMPTY_TASK_ID: u64 = 0;

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ScheduleType {
    IntervalType = 0,
    CronType = 1,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum Status {
    Suspended = 0,
    Started = 1,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum State {
    Scheduled = 0,
    Executing = 1,
    Succeeded = 2,
    Failed = 3,
    Cancelled = 4,
}

#[derive(Debug, Clone, PartialEq)]
pub struct ScheduleOptions {
    pub interval: Option<i32>,
    pub cron: Option<String>,
    pub time_zone: Option<String>,
    pub schedule_type: ScheduleType,
    pub milliseconds_interval: Option<u64>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct WarehouseOptions {
    pub warehouse: Option<String>,
    pub using_warehouse_size: Option<String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct Task {
    pub task_id: u64,
    pub task_name: String,
    pub query_text: String,
    pub when_condition: Option<String>,
    pub after: Vec<String>,
    pub comment: Option<String>,
    // expired useless
    pub owner: String,
    pub owner_user: String,
    pub schedule_options: Option<ScheduleOptions>,
    pub warehouse_options: Option<WarehouseOptions>,
    pub next_scheduled_at: Option<DateTime<Utc>>,
    pub suspend_task_after_num_failures: Option<u64>,
    // TODO
    pub error_integration: Option<String>,
    pub status: Status,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
    pub last_suspended_at: Option<DateTime<Utc>>,
    // TODO
    pub session_params: BTreeMap<String, String>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct TaskRun {
    pub task: Task,
    pub run_id: u64,
    pub attempt_number: i32,
    pub state: State,
    pub scheduled_at: DateTime<Utc>,
    pub completed_at: Option<DateTime<Utc>>,
    pub error_code: i64,
    pub error_message: Option<String>,
    // expired useless
    pub root_task_id: u64,
}

#[derive(Debug, Clone, PartialEq)]
pub enum TaskMessageType {
    Execute,
    Schedule,
    Delete,
    After,
}

#[derive(Debug, Clone, PartialEq)]
pub enum TaskMessage {
    //  Execute Task immediately. If an error occurs, if it is a SQL error in the task,
    // it will be recorded in the error message of the task run.
    ExecuteTask(Task),
    //  Schedule Task will try to spawn a thread in Query to continue running according to the time set in schedule
    ScheduleTask(Task),
    // Delete the task information and try to cancel the scheduled task in the query.
    DeleteTask(String, Option<WarehouseOptions>),
    //  After Task will bind Task to the tasks in Task.afters.
    // When Execute Task is executed, after all the after tasks of Task are completed,
    // the execution will continue.
    AfterTask(Task),
}

impl TaskMessage {
    pub fn task_name(&self) -> &str {
        match self {
            TaskMessage::ExecuteTask(task)
            | TaskMessage::ScheduleTask(task)
            | TaskMessage::AfterTask(task) => task.task_name.as_str(),
            TaskMessage::DeleteTask(task_name, _) => task_name.as_str(),
        }
    }

    pub fn ty(&self) -> TaskMessageType {
        match self {
            TaskMessage::ExecuteTask(_) => TaskMessageType::Execute,
            TaskMessage::ScheduleTask(_) => TaskMessageType::Schedule,
            TaskMessage::DeleteTask(_, _) => TaskMessageType::Delete,
            TaskMessage::AfterTask(_) => TaskMessageType::After,
        }
    }

    pub fn key(&self) -> String {
        Self::key_with_type(self.ty(), self.task_name())
    }

    pub fn key_with_type(ty: TaskMessageType, task_name: &str) -> String {
        let ty_num = match ty {
            TaskMessageType::Execute => 0,
            TaskMessageType::Schedule => 1,
            TaskMessageType::Delete => 2,
            TaskMessageType::After => 3,
        };
        format!("{}-{}-{}", TaskMessage::prefix_range().0, ty_num, task_name)
    }

    /// Returns the inclusive range of key prefixes used by `TaskMessage`.
    ///
    /// This range can be used to scan all keys generated by `TaskMessage::key()`
    /// and related methods (e.g., `schedule_key`). The prefix `0` is prepended
    /// to all task-related keys to group them under the same prefix range,
    /// enabling efficient key scanning or iteration.
    ///
    /// The returned range is (0, 1), which includes all keys starting with `0-`
    /// (as produced by `TaskMessage::prefix()`), and excludes any other unrelated prefixes.
    pub fn prefix_range() -> (i64, i64) {
        (0, 1)
    }

    pub fn warehouse_options(&self) -> Option<&WarehouseOptions> {
        match self {
            TaskMessage::ExecuteTask(task)
            | TaskMessage::ScheduleTask(task)
            | TaskMessage::AfterTask(task) => task.warehouse_options.as_ref(),
            TaskMessage::DeleteTask(_, warehouse_options) => warehouse_options.as_ref(),
        }
    }
}
